Streaming Analyze (WebSocket)
For real-time content moderation on streaming text data, DynamoGuard also supports a WebSocket-based streaming analyze endpoint. This is particularly useful for monitoring AI model responses as they are generated, enabling immediate policy enforcement and content filtering.
WebSocket Connection
Endpoint: ws://your-domain/v1/moderation/stream/analyze
Authentication
Authentication is handled via an auth
event that must be sent immediately after establishing the WebSocket connection.
Client Events (Events You Send)
1. auth
- Authenticate Connection
Purpose: Authenticates the WebSocket connection using your API key.
Data Structure:
{
"token": "string (your-api-key)"
}
Required Fields:
token
: Your DynamoAI API key
Server Response:
- ✅ Acknowledgment:
client-info
event (indicates successful authentication) - ❌ Error:
error
event if authentication fails
2. start
- Initialize Streaming Session
Purpose: Establishes a new streaming session for content analysis.
Data Structure:
{
"messages": [
{
"role": "user",
"content": "string"
}
],
"policyIds": ["string (24-character hex)"],
"modelId": "string (24-character hex)"
}
Required Fields:
messages
: Array with a single message object containing the input promptpolicyIds
: Array of policy IDs to apply for moderationmodelId
: Model ID to associate with the session
Server Response:
- ✅ Acknowledgment:
session_start
event with{ started: true }
- ❌ Error:
WsException
if session already exists or validation fails
3. analyze
- Stream Text for Analysis
Purpose: Sends text chunks for real-time analysis against configured policies.
Data Structure:
{
"text": "string"
}
Required Fields:
text
: The text chunk to analyze
Server Response:
- ✅ Analysis Result:
analyze_result
event - ❌ Error:
WsException
if session not found
4. end
- Terminate Streaming Session
Purpose: Ends the current streaming session and logs final results.
Data Structure:
{}
Server Response:
- ✅ Acknowledgment:
session_end
event with session summary - ❌ Error:
WsException
if session not found
Server Events (Events You Receive)
1. client-info
- Authentication Confirmation
Triggered: When an auth
event is successfully processed.
Data Structure:
{
"authenticated": true
}
2. session_start
- Session Initialization Confirmation
Triggered: When a start
event is successfully processed.
Data Structure:
{
"started": true
}
3. analyze_result
- Policy Analysis Results
Triggered: As a response to the analyze
event. Actual moderation is performed only when the buffer threshold is reached. Till then the API simply returns PolicyAction as NONE.
Data Structure:
{
"text": "string",
"finalAction": "BLOCK|WARN|REDACT|SANITIZE|NONE",
"policyResults": [
{
"policyId": "string",
"workerRequestId": "string",
"outputs": "object",
"action": "BLOCK|WARN|REDACT|SANITIZE|NONE",
"violated": "boolean"
}
],
"numTokens": "number",
"warning": "string (optional)"
}
Fields:
text
: The analyzed text chunkfinalAction
: Overall action determined by all policiespolicyResults
: Individual results for each applied policynumTokens
: Number of tokens analyzed in this chunkwarning
: Token limit warning (if applicable)
4. session_end
- Session Termination Confirmation
Triggered: When an end
event is successfully processed.
Data Structure:
{
"ended": true,
"numTokens": "number",
"warning": "string (optional)"
}
Fields:
ended
: Confirmation that session endednumTokens
: Total tokens analyzed in the sessionwarning
: Token limit warning (if applicable)
5. session_reset
- Session Reset Notification
Triggered: When a new WebSocket connection is established (existing sessions are cleared).
Data Structure:
{
"reset": true
}
6. error
- Error Notification
Triggered: When an error occurs during processing.
Data Structure:
{
"message": "string",
"code": "string (optional)"
}
Error Handling
WebSocket Exceptions
The server may send error
events for various error conditions:
-
Authentication Errors:
- Invalid API key
- Missing authentication
-
Session Errors:
StreamingSessionError
: Session not found or already existsTokenLimitError
: Input exceeds token limits
-
Validation Errors:
- Invalid message format
- Missing required fields
- Invalid policy IDs
Usage Flow
- Connect to the WebSocket endpoint
- Send
auth
event with your API key - Receive
client-info
confirmation - Send
start
event to initialize session - Receive
session_start
confirmation - Send
analyze
events with text chunks and receiveanalyze_result
as response for each of them - Send
end
event to terminate session - Receive
session_end
confirmation with final summary
Important Notes
Authentication
- Authentication must be performed immediately after WebSocket connection
- The
auth
event must be sent before any other operations - Connection will be rejected if authentication fails
Token Buffering
- Text is buffered until it reaches the default threshold (20 tokens)
- Policies are only applied when the buffer threshold is exceeded
- This reduces API calls while maintaining real-time analysis
Token Limits
- Input messages are validated against policy token limits
- Only the first N tokens (as per policy limits) are analyzed
- Warnings are provided when token limits are exceeded
Session Management
- Each WebSocket connection can have only one active session
- Sessions are automatically cleaned up on connection close
- New connections reset any existing sessions
Policy Actions
BLOCK
: Content violates policy, should be blockedWARN
: Content may be problematic, warning recommendedREDACT
: Sensitive content should be redactedSANITIZE
: Content should be cleaned/sanitizedNONE
: No policy violations detected
Example Implementation
Basic WebSocket Client
import json
import logging
from typing import Any, Iterator
import certifi
from openai import OpenAI
from pydantic import Field
from pydantic_settings import BaseSettings
from websocket import WebSocket, create_connection
logger = logging.getLogger("stream_example")
logger.setLevel("INFO")
WS_API_URL = "<dynamoai-url>"
DYNAMO_API_KEY = "<dynamoai-api-token>"
OPENAI_API_KEY = "<openai-key>"
POLICY_IDS = ["68890c1246e0b250a324af8b"]
MODEL_ID = "66f243e15a47dc8bdbb82504"
OPENAI_MODEL = "mistral-large-latest"
def send_ws_msg(ws: WebSocket, event: str, data: dict):
msg = json.dumps({"event": event, "data": data})
logger.debug(msg)
ws.send(msg)
def recv_ws_msg(ws: WebSocket, target_event: str) -> Any:
while True:
response = json.loads(ws.recv())
logger.debug(response)
if response["event"] == target_event:
return response["data"]
elif response["event"] == "error":
raise ValueError(response)
def create_ws_connection() -> WebSocket:
ws = create_connection(
f"{WS_API_URL}/v1/moderation/stream/analyze",
sslopt={"ca_certs": certifi.where()},
)
logger.debug("Authorizing")
send_ws_msg(ws, "auth", {"token": DYNAMO_API_KEY})
recv_ws_msg(ws, "client-info")
logger.debug("Authorized")
return ws
def start_session(ws: WebSocket, input_prompt: str):
send_ws_msg(
ws,
"start",
{
"messages": [{"role": "user", "content": input_prompt}],
"policyIds": POLICY_IDS,
"modelId": MODEL_ID,
},
)
recv_ws_msg(ws, "session_start")
def analyze(ws: WebSocket, output_chunk: str) -> str:
send_ws_msg(ws, "analyze", {"text": output_chunk})
data = recv_ws_msg(ws, "analyze_result")
return data["finalAction"]
def end_session(ws: WebSocket):
send_ws_msg(ws, "end", {})
recv_ws_msg(ws, "session_end")
client = OpenAI(api_key=OPENAI_API_KEY, base_url="https://api.mistral.ai/v1")
def guarded_chat(ws: WebSocket, prompt: str) -> Iterator[str]:
start_session(ws, prompt)
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0,
stream=True,
)
for chunk in response:
choice = chunk.choices[0]
message = choice.delta.content or ""
end_of_stream = bool(choice.finish_reason)
if not end_of_stream:
action = analyze(ws, message)
if action == "BLOCK":
yield f"{message}[BLOCKED]"
break
yield message
end_session(ws)
def chat(prompt: str):
ws = create_ws_connection()
for chunk in guarded_chat(ws, prompt):
print(chunk, end="")
chat(
"Write a draft email for a credit score report update. It should start generic and then add details like fake names, phone numbers, addresses, etc."
)
Key Features
- Token Buffering: Text is buffered until it reaches the default threshold (20 tokens), reducing API calls while maintaining real-time analysis
- Real-time Analysis: Policies are applied as text streams in, enabling immediate content filtering
- Session Management: Each WebSocket connection can manage one active session with automatic cleanup
- Comprehensive Error Handling: Built-in error handling for various failure scenarios